package com.mimo.common.utils;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

/**
 * 该工具主要用于解决流式数据下的数据合并，并最终通过批处理的方式，消费其中的数据。
 * <p>
 * etc.
 * <li>给同一个用户发起100次的充值动作，对于系统来说，可能需要触发100 IO请求。而通过该工具可以把100次的充值动作，合并成一批，最终再一次性消费掉这100个充值动作。从而使得IO交互从100变成1.
 * <li>引入分组概念。还是以充值动作为例，100个不同的用户，各自充值100次，正常可能需要100*100=10K次的请求。通过分组，可以基于单个用户维度合并其下的100个充值动作，从而使用100个用户，最终也就是只产生100次IO请求。
 * 
 * @author Hongyu
 * @param <E>
 * @param <K>
 */
public class BatchExecutor<E, K> implements Closeable {

  private static final Logger log = LoggerFactory.getLogger(BatchExecutor.class);

  private static final int DEFAULT_INITIAL_CAPACITY = 128;

  private static final long MIN_INTERVAL = 500;

  private volatile boolean running;

  private final ScheduledExecutorService scheduledThreadPool;

  /**
   * 用于检查整个组的数量上下文
   */
  private final Object globalLock = new Object();

  private final Map<K, ReentrantLock> locks;

  private final Map<K, Collection<E>> holder;

  /**
   * 从E中获得K的维度
   */
  private final Function<E, K> keyExtractor;

  private final Consumer<Collection<E>> executer;

  /**
   * 定时任务执行间隔
   */
  private final long interval;

  /**
   * 用于定义每个分组里的最大元素数量
   */
  private final long maxElementsPerGroup;

  /**
   * 维护的最大分组数量
   */
  private final long maxGroups;

  public BatchExecutor(Function<E, K> keyExtractor, Consumer<Collection<E>> executer, Duration val,
      long maxElementsPerGroup, long maxGroups) {
    Objects.requireNonNull(keyExtractor, "keyExtractor cannot be null");
    Objects.requireNonNull(executer, "executer cannot be null");

    this.interval = val.toMillis();
    Assert.isTrue(interval >= MIN_INTERVAL, String.format("时间间隔不能少于 %s ms", MIN_INTERVAL));

    this.maxElementsPerGroup = maxElementsPerGroup;
    this.maxGroups = maxGroups;

    this.keyExtractor = keyExtractor;
    this.executer = executer;

    this.locks = new ConcurrentHashMap<>(DEFAULT_INITIAL_CAPACITY);
    this.holder = new ConcurrentHashMap<>(DEFAULT_INITIAL_CAPACITY);

    scheduledThreadPool = Executors.newScheduledThreadPool(1);
    scheduledThreadPool.scheduleWithFixedDelay(consumeTasks(), this.interval, this.interval, TimeUnit.MILLISECONDS);

    // 标记运行状态
    running = true;
  }

  /**
   * 一个基准构造器, 批提交超时为3秒， 最大组数为1000， 单组元素最多为50
   * 
   * @param keyExtractor
   * @param executer
   */
  public BatchExecutor(Function<E, K> keyExtractor, Consumer<Collection<E>> executer) {
    this(keyExtractor, executer, Duration.ofSeconds(3), 50, 1000);
  }

  /**
   * 该类的核心方法，也是所有数据的来源和入口
   * <p>
   * 如果追加的element无法解得K维度, 则抛出异常 {@link IllegalArgumentException} or {@link IllegalStateException}. 业务调用方需要做特定的处理
   * 
   * @param element
   * @throws IllegalArgumentException
   *           如果加入的元素无法获得对应的key值，抛出该异常
   * @throws IllegalStateException
   *           组件状态为停止时，尝试加入元素会抛异常
   */
  public void put(E element) {
    if (!isActive()) {
      throw new IllegalStateException("组件状态已经停止");
    }

    K key = null;
    try {
      key = keyExtractor.apply(element);
      Objects.requireNonNull(key, "element 维度结果不能为空");
    } catch (Exception ex) {
      throw new IllegalArgumentException("element 维度解析异常", ex);
    }

    ReentrantLock lock;
    // 组数量超过的,进行清洗
    synchronized (globalLock) {
      log.debug("current groups size:{} , maxGroups:{} , elements:{} ", this.holder.size(), this.maxGroups,
          this.holder);
      if (this.holder.size() >= this.maxGroups) {
        this.consumeTasks().run();
        log.debug("after clean -->current groups size:{} , maxGroups:{} , elements:{} ", this.holder.size(),
            this.maxGroups, this.holder);
        lock = locks.computeIfAbsent(key, k -> new ReentrantLock());
        lock.lock();
        try {
          this.holder.computeIfAbsent(key, k -> new LinkedList<>());
        } finally {
          lock.unlock();
        }
      }
    }

    // 控制组内元素数量
    lock = locks.computeIfAbsent(key, k -> new ReentrantLock());
    lock.lock();
    Collection<E> es = this.holder.computeIfAbsent(key, k -> new LinkedList<>());
    try {
      es.add(element);

      // 组内元素数量检查
      log.debug("current group[{}] size:{} , maxElementsPerGroup:{} , elements:{} ", key, es.size(),
          this.maxElementsPerGroup, es);
      if (es.size() >= maxElementsPerGroup) {
        // 通知消费者消费对应数据
        this.submit(es);

        log.debug("after --> current group[{}] size:{} , maxElementsPerGroup:{} , elements:{} ", key, es.size(),
            this.maxElementsPerGroup, es);
      }
    } finally {
      lock.unlock();
    }
  }

  /**
   * 用于消费任务数据
   * <p>
   * 此次需要特别强调,由于批处理是目标，以增大吞吐量，当该方法被调用时，会立马消费掉所有桶中的数据，不管是否满足单个桶的最大水位线。 <br>
   * 在清除桶中数据之后，会立马从当前池中把桶也移除，以保证那一刻，池中的桶都不再占用位置了
   */
  private Runnable consumeTasks() {
    return () -> {
      synchronized (globalLock) {
        Iterator<Entry<K, Collection<E>>> holderIterator = holder.entrySet().iterator();
        while (holderIterator.hasNext()) {
          Entry<K, Collection<E>> e = holderIterator.next();
          holderIterator.remove();

          ReentrantLock lock = locks.computeIfAbsent(e.getKey(), k -> new ReentrantLock());
          lock.lock();
          try {
            if (!CollectionUtils.isEmpty(e.getValue())) {
              this.submit(e.getValue());
            }
          } finally {
            // 以下二行代码的顺序可千万不能调转,否则在一些特殊的竞态环境下，会引发意想不到的事情
            locks.remove(e.getKey());
            lock.unlock();
          }
        }
      }
    };
  }

  // 对分组数据进行消费
  private void submit(Collection<E> es) {
    try {
      this.executer.accept(es);
    } catch (Exception e) {
      log.error("消费组数据失败", es);
    } finally {
      log.debug("完成消费元素:{}", es);
      es.clear();
    }
  }

  /**
   * 判断当前组件是否运行中
   * 
   * @return
   */
  public boolean isActive() {
    return this.running;
  }

  @Override
  public void close() throws IOException {
    running = false;

    // 停止定时任务调度
    scheduledThreadPool.shutdownNow();

    // 将当前所有数据尽可能刷出去
    this.consumeTasks().run();

    this.locks.clear();
    this.holder.clear();
  }
}
